// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "range_arena_rebuild.h"

#include <unordered_set>

#include "pure_mem/pmemrep.h"
#include "pure_mem/rangearena/thread_safe_sorted_list.h"

namespace rocksdb {

MultiRangeManager::MultiRangeManager(void *multi_range)
    : event_process_(&MultiRangeManager::ThreadRun, this),
      multi_range_(multi_range) {}

MultiRangeManager::~MultiRangeManager() {
  closed_ = true;
  event_process_.join();
  ((MultiRangeArena *)multi_range_)->RangeArenaClear();
}

void MultiRangeManager::ThreadRun() {
  std::this_thread::sleep_for(std::chrono::milliseconds(1));
  while (true) {
    if (closed_) {
      return;
    }
    std::shared_ptr<EVENT> cur = events_.try_pop();
    if (cur == nullptr) {
      // There are no events in the current queue。
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
      continue;
    }
    switch (cur->op) {
      case ARENA_EVENT_HANDLE: {
        Handle(cur->range_arena);
        break;
      }
      default: {
        assert(false);
      }
    }
  }
}

void MultiRangeManager::AcceptEvent(RangeArena *cur, RangeArenaEvent op) {
  events_.Push(EVENT(cur, op));
}

//size_t MultiRangeManager::GetEventNum() const { return events_.Size(); }

int MultiRangeManager::UserKeyCompare(const Slice &a, const Slice &b) {
  int comLength = std::min(a.size(), b.size());
  for (int i = 0; i < comLength; i++) {
    if ((uint8_t)a[i] > (uint8_t)b[i]) {
      return 1;
    } else if ((uint8_t)a[i] < (uint8_t)b[i]) {
      return -1;
    }
  }
  if (a.size() > b.size()) return 1;
  if (a.size() < b.size()) return -1;
  return 0;
}

// Get the kv length corresponding to the current buf
size_t MultiRangeManager::GetNodeKVSize(const char *buf) {
  Slice curKey = GetLengthPrefixedSlice(buf);
  Slice val = GetLengthPrefixedSlice(curKey.data() + curKey.size());
  return VarintLength(curKey.size()) + curKey.size() +
         VarintLength(val.size()) + val.size();
}

// Get the sequential linked list of all data nodes in the current memory block.
void MultiRangeManager::GenNodeList(RangeArena *current_rangearena,
                                    std::list<void *> &list, size_t &data_num) {
  InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator *iter =
      ((MultiRangeArena *)multi_range_)->PMemRep()->GetARTList()->GetIterator();

  Slice key = current_rangearena->CurrentBlockKeyStart();
  int len = (int)key.size() + 8 + 1;
  uint32_t keySize = VarintLength(len) + len;
  char ret[keySize];
  memset(ret, '\0', keySize);
  memset(ret + keySize - 8, 255, 8);
  char *p = EncodeVarint32(ret, len);
  memcpy(p, key.data(), key.size());

  iter->Seek(ret);
  list.clear();
  data_num = 0;
  while (iter->Valid()) {
    auto node = iter->node();
    auto cur = node;
    size_t length = this->GetNodeKVSize(cur->Key());
    // Get the size of the value corresponding to the key
    MvccKey mk;
    mk.parseKey(GetLengthPrefixedSlice(cur->Key()));
    // Comparison of byte order
    int cmp =
        UserKeyCompare(mk.userKey_, current_rangearena->CurrentBlockKeyEnd());
    if (cmp >= 0) {
      break;
    }
    list.push_back((void *)cur);
    data_num += length;
    iter->Next();
  }
}

// Write the acquired data of the sequential data link list to a new memory
// block
void MultiRangeManager::PutNodeListIntoNewBlock(RangeArena *ra,
                                                std::list<void *> &list) {
  for (auto cur : list) {
    auto curNode = static_cast<VersionNode *>(cur);

    const char *ckey = curNode->Key();
    size_t curKVSize = this->GetNodeKVSize(ckey);
    // Allocate (len) should be the same as mentable's len, and parse the value
    // corresponding to the key
    char *buff =nullptr;
    bool isFUll = ra->Allocate(curKVSize, curNode, &buff);
    assert(!isFUll);
    memcpy(buff, ckey, curKVSize);
    ra->AddData2OKList(curKVSize, buff);
    // Transform the pointer of multi-version records on the ART tree
    bool ok = curNode->CASSetKey((void *)ckey, (void *)buff);
    assert(ok);
  }
}

// Create new memblock
RangeArena *MultiRangeManager::NewBlock(size_t data_num, Slice &start,
                                        Slice &end) {
  size_t block_size = data_num + data_num * kMemBlockReservedPer / 100;
  size_t list_size =
      data_num * kMemBlockGrowPer / 100 - data_num * kMemBlockReservedPer / 100;
  return new RangeArena(block_size, list_size, start, end, nullptr);
}

// Get the node on the art tree and insert the map
//void MultiRangeManager::GenNewArtNodeSet(
//    RangeArena *currentRangeArena, std::list<void *> &list,
//    size_t &cur_data_num, std::unordered_set<void *> &nodeStoredNew) {
//  nodeStoredNew.clear();
//  list.clear();
//  cur_data_num = 0;
//  this->GenNodeList(currentRangeArena, list, cur_data_num);
//
//  for (auto cur : list) {
//    nodeStoredNew.insert(cur);
//  }
//}

size_t MultiRangeManager::exceptedNewBlockNum(size_t cur_data_num){
  size_t block_count = 1;
  size_t tmp_data_num = cur_data_num;
  size_t max_data_num = kMemBlockMaxSize / (100 + kMemBlockGrowPer) * 100;
  while (tmp_data_num >= max_data_num) {
    block_count *= 2;
    tmp_data_num /= 2;
  }
  return block_count;
}

size_t MultiRangeManager::kvDataSplit(size_t block_mid_size, std::list<void *>& list_cur_art,
  std::vector<std::list<void *>>& list_block,
  std::vector<Slice>& start, std::vector<Slice>& end,
  std::vector<size_t>& data_num){
  Slice pre_userkey;
  MvccKey mk;
  size_t ii = 0;
  //size_t block_mid_size = cur_data_num / block_count;
  for (auto cur : list_cur_art) {
    auto curNode = (VersionNode *)cur;
    Slice curSKey = GetLengthPrefixedSlice(curNode->Key());
    mk.parseKey(curSKey);

    if (data_num[ii] >= block_mid_size &&
        pre_userkey.compare(mk.userKey_) != 0) {
          end[ii] = mk.userKey_;
          ii++;
          start[ii] = mk.userKey_;
    }
    pre_userkey = mk.userKey_;
    list_block[ii].push_back(cur);
    data_num[ii] += this->GetNodeKVSize(curNode->Key());
  }
  // If the data of the same userkey is inserted continuously, ii does not
  // perform in the previous for loop++
  size_t block_count = ii + 1;

  // If the size of the last block is less than the specified minimum value and
  // it is determined to be a decomposition algorithm, no disassembly will be
  // performed to prevent the generation of too small memory blocks
  if (ii > 0 && (data_num[ii] < kBlockMinSize)) {
    for (auto data : list_block[block_count - 1]) {
      list_block[ii - 1].push_back(data);
      // Store the data in the last list into the previous list
    }
    data_num[ii - 1] += data_num[ii];
    block_count -= 1;
  }
  return block_count;
}

// Memory block expand/decompose when the memory block space is full,
// the memory block needs to be handled
void MultiRangeManager::Handle(RangeArena *current_range_arena) {
// std::cout << "MultiRangeManager::Handle begin. ";
// current_range_arena->Dump();
  // 1. Get node of art into the list
  std::list<void *> list_cur_art;
  size_t cur_data_num = 0;
  this->GenNodeList(current_range_arena, list_cur_art, cur_data_num);

  // 2.decompose into multi memblock, and consider case of multiple versions.
  // The multiple version data corresponding to the key
  // must be in the same memory block.
  size_t block_count = exceptedNewBlockNum(cur_data_num);

  std::vector<std::list<void *>> list_block(block_count);
  std::vector<size_t> data_num(block_count);
  std::vector<Slice> start(block_count);
  std::vector<Slice> end(block_count);
  for (size_t i = 0; i < block_count; i++) {
    list_block[i].clear();
    data_num[i] = 0;
  }

  block_count = this->kvDataSplit(cur_data_num / block_count, list_cur_art, list_block, start, end, data_num);

  start[0] = current_range_arena->CurrentBlockKeyStart();
  end[block_count - 1] = current_range_arena->CurrentBlockKeyEnd();

  // Create multi new memblock
  RangeArena *ra[block_count];
  for (int i = (int)block_count - 1; i >= 0; i--) {
    ra[i] = this->NewBlock(data_num[i], start[i], end[i]);
    // Store the data of the linked list in the memory block
    this->PutNodeListIntoNewBlock(ra[i], list_block[i]);
    // Modify the memory block address saved by the key in the skiplist
    if (i == 0) {
      (static_cast<MultiRangeArena *>(multi_range_))->RangeArenaChange(start[i], ra[i]);
    } else {
      (static_cast<MultiRangeArena *>(multi_range_))->RangeArenaAdd(start[i], ra[i]);
    }

    ra[i]->ChangeStatus(MemBlockState::MEMORY_BLOCK_OK);

    ((MultiRangeArena *)multi_range_)->RangeArenaQue().Push(ra[i]);

    if (closed_) {  return; }
  }

  current_range_arena->ChangeStatus(MemBlockState::MEMORY_BLOCK_IS_NO_INSERT);

  // Processing temporary data
  std::unordered_set<void *> nodeStored;
  nodeStored.clear();
  for (auto cur : list_cur_art) {
    nodeStored.insert(cur);
  }

  InlineUserKeyIndex<const MemTableRep::KeyComparator &>::Iterator *iter =
      ((MultiRangeArena *)multi_range_)->PMemRep()->GetARTList()->GetIterator();

  AtomicLinkedList<void *>& block2node = current_range_arena->Block2Node();
  void* cur = nullptr;

  // if current blocks has any allocate request without response. wait
  while(!block2node.empty()){
    if (closed_) {return;}
    if(!block2node.sweepHead(cur)){
      continue;
    }
    // already insert into new blocks. so ignored.
    if (nodeStored.find(cur) != nodeStored.end()) {
        continue;
    }

    // make sure all kv data has been copied to blocks.
    while (current_range_arena->Allocate_times() != current_range_arena->Resv_ok_times()) {
      if (closed_) {return;}
      //std::cout << "wait Node mounting to the ART tree.";
      //current_range_arena->Dump();
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    auto curN = (VersionNode *)cur;
    // If the cur node can be found in nodeStoredOK, but it is not on the ART
    // tree, it means it has been deleted; if it is on the ART tree, data
    // migration is performed
    bool onARTTree = true;
    iter->Seek(curN->Key());
    if (!iter->Valid() || iter->key() != curN->Key()) onARTTree = false;

    if (!onARTTree){ // is removed by deletion opertations.
      continue;
    }

    // Transform the pointer of multi-version records on the ART tree
    const char *buf = curN->Key();
    size_t length = this->GetNodeKVSize(buf);

    MvccKey mc;
    mc.parseKey(GetLengthPrefixedSlice(buf));

    void* insertRA = nullptr;
    char *buff = ((MultiRangeArena*)multi_range_)->Allocate(length, mc.userKey_, curN, &insertRA);
    memcpy(buff, buf, length);
    bool ok = curN->CASSetKey((void *)buf, buff);
    assert(ok);
    rocksdb::MultiRangeArena::AllocateOK(mc.userKey_, length, buff, insertRA);
    // mark for deletion, waiting for many seconds then really delete it.
    rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion((void*)buf, DELETION_TYPE_POINTER);
  }

  void* nodeDel = nullptr;
  AtomicLinkedList<void *>& nodePushList = current_range_arena->NodePush();
  while(nodePushList.sweepHead(nodeDel)){
    rocksdb::DeleteWhileNoRefs::getInstance()->markNodeForDeletion((void*)nodeDel, DELETION_TYPE_VERSION);
  }

  // delete old memblock.
  current_range_arena->ChangeStatus(MemBlockState::MEMORY_BLOCK_IS_DELETING);
  current_range_arena->freeBlock(); // release blocks of memroy immediately, in case memory is not enough.
}
}  // namespace rocksdb
